package com.hzya.frame;

import org.apache.nifi.processor.AbstractProcessor;

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * @Author：liuyang
 * @Package：com.hzya.frame
 * @Project：nifi-hzyadev-bundle
 * @name：VoucherConversion
 * @Date：2025/7/30 09:30
 * @Filename：VoucherConversion
 */
@Tags({"credential", "transform", "custom"})
@CapabilityDescription("自定义处理器用于处理凭证转换逻辑，支持MySQL数据库查询")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
public class VoucherConversionProcessor extends AbstractProcessor {
    // 数据库连接池服务属性
    public static final PropertyDescriptor DATABASE_CONNECTION_POOLING_SERVICE = new PropertyDescriptor.Builder().name("database-connection-pooling-service").displayName("Database Connection Pooling Service").description("数据库连接池服务").required(true).identifiesControllerService(DBCPService.class).build();

    // 查询SQL属性
    public static final PropertyDescriptor QUERY_SQL = new PropertyDescriptor.Builder().name("query-sql").displayName("Query SQL").description("查询凭证信息的SQL语句").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("SELECT username, password, token FROM credentials WHERE user_id = ?").build();

    // 更新SQL属性
    public static final PropertyDescriptor UPDATE_SQL = new PropertyDescriptor.Builder().name("update-sql").displayName("Update SQL").description("更新凭证信息的SQL语句").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("UPDATE credentials SET last_access_time = NOW() WHERE user_id = ?").build();

    // 关系定义
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("成功处理的FlowFile").build();

    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("处理失败的FlowFile").build();

    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(DATABASE_CONNECTION_POOLING_SERVICE);
        descriptors.add(QUERY_SQL);
        descriptors.add(UPDATE_SQL);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {
        // 处理器调度时的初始化逻辑
        getLogger().info("CredentialTransformProcessor scheduled");
    }

    @OnStopped
    public void onStopped() {
        // 处理器停止时的清理逻辑
        getLogger().info("CredentialTransformProcessor stopped");
    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        try {
            // 获取数据库连接池服务
            final DBCPService dbcpService = context.getProperty(DATABASE_CONNECTION_POOLING_SERVICE).asControllerService(DBCPService.class);

            // 获取SQL语句
            final String querySQL = context.getProperty(QUERY_SQL).getValue();
            final String updateSQL = context.getProperty(UPDATE_SQL).getValue();

            // 执行数据库操作示例
            executeCredentialTransform(dbcpService, querySQL, updateSQL, flowFile);

            // 这里可以添加你的凭证转换业务逻辑
            // processCredentialLogic(flowFile, session);

            // 转移到成功关系
            session.transfer(flowFile, REL_SUCCESS);
            getLogger().info("Successfully processed FlowFile {}", new Object[]{flowFile});

        } catch (Exception e) {
            getLogger().error("Failed to process FlowFile {}", new Object[]{flowFile, e});
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    /**
     * 执行凭证转换相关的数据库操作示例
     */
    private void executeCredentialTransform(DBCPService dbcpService, String querySQL, String updateSQL, FlowFile flowFile) throws SQLException {

        Connection connection = null;
        PreparedStatement queryStmt = null;
        PreparedStatement updateStmt = null;
        ResultSet resultSet = null;

        try {
            // 获取数据库连接
            connection = dbcpService.getConnection();

            // 示例：从FlowFile属性中获取用户ID（实际场景中可能从FlowFile内容中解析）
            String userId = flowFile.getAttribute("user_id");
            if (userId == null) {
                userId = "default_user"; // 默认值或从其他地方获取
            }

            // 执行查询操作
            queryStmt = connection.prepareStatement(querySQL);
            queryStmt.setString(1, userId);
            resultSet = queryStmt.executeQuery();

            // 处理查询结果
            while (resultSet.next()) {
                String username = resultSet.getString("username");
                String password = resultSet.getString("password");
                String token = resultSet.getString("token");

                getLogger().info("Retrieved credential for user: {}, username: {}", new Object[]{userId, username});

                // 这里可以添加凭证转换逻辑
                // String transformedCredential = transformCredential(username, password, token);
            }

            // 执行更新操作（如果提供了更新SQL）
            if (updateSQL != null && !updateSQL.trim().isEmpty()) {
                updateStmt = connection.prepareStatement(updateSQL);
                updateStmt.setString(1, userId);
                int updatedRows = updateStmt.executeUpdate();
                getLogger().info("Updated {} rows for user {}", new Object[]{updatedRows, userId});
            }

            // 示例：执行其他SQL操作
            executeSampleQueries(connection, userId);

        } finally {
            // 关闭资源
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException e) { /* ignore */ }
            }
            if (queryStmt != null) {
                try {
                    queryStmt.close();
                } catch (SQLException e) { /* ignore */ }
            }
            if (updateStmt != null) {
                try {
                    updateStmt.close();
                } catch (SQLException e) { /* ignore */ }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) { /* ignore */ }
            }
        }
    }

    /**
     * 执行示例查询的方法
     */
    private void executeSampleQueries(Connection connection, String userId) throws SQLException {

        // 示例1：查询用户权限
        String permissionQuery = "SELECT permission_name FROM user_permissions WHERE user_id = ?";
        try (PreparedStatement stmt = connection.prepareStatement(permissionQuery)) {
            stmt.setString(1, userId);
            try (ResultSet rs = stmt.executeQuery()) {
                while (rs.next()) {
                    String permission = rs.getString("permission_name");
                    getLogger().debug("User {} has permission: {}", new Object[]{userId, permission});
                }
            }
        }

        // 示例2：插入审计日志
        String auditInsert = "INSERT INTO audit_log (user_id, operation, operation_time) VALUES (?, ?, NOW())";
        try (PreparedStatement stmt = connection.prepareStatement(auditInsert)) {
            stmt.setString(1, userId);
            stmt.setString(2, "credential_transform");
            int inserted = stmt.executeUpdate();
            getLogger().debug("Inserted {} audit log entry for user {}", new Object[]{inserted, userId});
        }

        // 示例3：批量操作
        String batchUpdate = "UPDATE user_session SET last_activity = NOW() WHERE user_id = ?";
        try (PreparedStatement stmt = connection.prepareStatement(batchUpdate)) {
            // 可以添加到批次中
            stmt.setString(1, userId);
            stmt.addBatch();

            // 执行批次
            int[] results = stmt.executeBatch();
            getLogger().debug("Batch update results: {} for user {}", new Object[]{results.length, userId});
        }
    }

    /**
     * 示例凭证转换逻辑（占位方法）
     */
    private String transformCredential(String username, String password, String token) {
        // 这里添加你的具体凭证转换逻辑
        // 例如：加密、解密、格式转换等

        getLogger().debug("Transforming credential for username: {}", username);

        // 示例转换逻辑
        return "transformed_" + token;
    }

    /**
     * 处理凭证相关业务逻辑的示例方法
     */
    private void processCredentialLogic(FlowFile flowFile, ProcessSession session) {
        // 这里可以添加更多的凭证处理逻辑
        // 例如：
        // 1. 解析FlowFile内容
        // 2. 验证凭证格式
        // 3. 执行转换操作
        // 4. 更新FlowFile属性

        getLogger().info("Processing credential logic for FlowFile: {}", flowFile);
    }
}
